using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
using HslCommunication.BasicFramework;
using HslCommunication.Core;
using HslCommunication.Core.Net;

namespace HslCommunication.MQTT
{
	/// <summary>
	/// 基于MQTT协议的同步访问的客户端程序，支持以同步的方式访问服务器的数据信息，并及时的反馈结果<br />
	/// The client program based on MQTT protocol for synchronous access supports synchronous access to the server's data information and timely feedback of results
	/// </summary>
	/// <example>
	/// <code lang="cs" source="HslCommunication_Net45.Test\Documentation\Samples\MQTT\MqttSyncClientSample.cs" region="Test" title="简单的实例化" />
	/// <code lang="cs" source="HslCommunication_Net45.Test\Documentation\Samples\MQTT\MqttSyncClientSample.cs" region="Test2" title="带用户名密码的实例化" />
	/// <code lang="cs" source="HslCommunication_Net45.Test\Documentation\Samples\MQTT\MqttSyncClientSample.cs" region="Test3" title="连接示例" />
	/// <code lang="cs" source="HslCommunication_Net45.Test\Documentation\Samples\MQTT\MqttSyncClientSample.cs" region="Test4" title="读取数据示例" />
	/// <code lang="cs" source="HslCommunication_Net45.Test\Documentation\Samples\MQTT\MqttSyncClientSample.cs" region="Test5" title="带进度报告示例" />
	/// </example>
	public class MqttSyncClient : NetworkDoubleBase
	{
		private SoftIncrementCount incrementCount;

		private MqttConnectionOptions connectionOptions;

		private Encoding stringEncoding = Encoding.UTF8;

		/// <summary>
		/// 获取或设置当前的连接信息，客户端将根据这个连接配置进行连接服务器，在连接之前需要设置相关的信息才有效。<br />
		/// To obtain or set the current connection information, the client will connect to the server according to this connection configuration. 
		/// Before connecting, the relevant information needs to be set to be effective.
		/// </summary>
		public MqttConnectionOptions ConnectionOptions
		{
			get
			{
				return connectionOptions;
			}
			set
			{
				connectionOptions = value;
			}
		}

		/// <summary>
		/// 获取或设置使用字符串访问的时候，使用的编码信息，默认为UT8编码<br />
		/// Get or set the encoding information used when accessing with a string, the default is UT8 encoding
		/// </summary>
		public Encoding StringEncoding
		{
			get
			{
				return stringEncoding;
			}
			set
			{
				stringEncoding = value;
			}
		}

		/// <summary>
		/// 实例化一个MQTT的同步客户端<br />
		/// Instantiate an MQTT synchronization client
		/// </summary>
		public MqttSyncClient(MqttConnectionOptions options)
		{
			base.ByteTransform = new RegularByteTransform();
			connectionOptions = options;
			IpAddress = options.IpAddress;
			Port = options.Port;
			incrementCount = new SoftIncrementCount(65535L, 1L);
			ConnectTimeOut = options.ConnectTimeout;
		}

		/// <summary>
		/// 通过指定的ip地址及端口来实例化一个同步的MQTT客户端<br />
		/// Instantiate a synchronized MQTT client with the specified IP address and port
		/// </summary>
		/// <param name="ipAddress">IP地址信息</param>
		/// <param name="port">端口号信息</param>
		public MqttSyncClient(string ipAddress, int port)
		{
			connectionOptions = new MqttConnectionOptions
			{
				IpAddress = ipAddress,
				Port = port
			};
			base.ByteTransform = new RegularByteTransform();
			IpAddress = ipAddress;
			Port = port;
			incrementCount = new SoftIncrementCount(65535L, 1L);
		}

		/// <summary>
		/// 通过指定的ip地址及端口来实例化一个同步的MQTT客户端<br />
		/// Instantiate a synchronized MQTT client with the specified IP address and port
		/// </summary>
		/// <param name="ipAddress">IP地址信息</param>
		/// <param name="port">端口号信息</param>
		public MqttSyncClient(IPAddress ipAddress, int port)
		{
			connectionOptions = new MqttConnectionOptions
			{
				IpAddress = ipAddress.ToString(),
				Port = port
			};
			base.ByteTransform = new RegularByteTransform();
			IpAddress = ipAddress.ToString();
			Port = port;
			incrementCount = new SoftIncrementCount(65535L, 1L);
		}

		/// <inheritdoc />
		protected override OperateResult InitializationOnConnect(Socket socket)
		{
			OperateResult<byte[]> operateResult = MqttHelper.BuildConnectMqttCommand(connectionOptions, "HUSL");
			if (!operateResult.IsSuccess)
			{
				return operateResult;
			}
			OperateResult operateResult2 = Send(socket, operateResult.Content);
			if (!operateResult2.IsSuccess)
			{
				return operateResult2;
			}
			OperateResult<byte, byte[]> operateResult3 = ReceiveMqttMessage(socket);
			if (!operateResult3.IsSuccess)
			{
				return operateResult3;
			}
			OperateResult operateResult4 = MqttHelper.CheckConnectBack(operateResult3.Content1, operateResult3.Content2);
			if (!operateResult4.IsSuccess)
			{
				socket?.Close();
				return operateResult4;
			}
			incrementCount.ResetCurrentValue();
			return OperateResult.CreateSuccessResult();
		}

		/// <inheritdoc />
		protected override async Task<OperateResult> InitializationOnConnectAsync(Socket socket)
		{
			OperateResult<byte[]> command = MqttHelper.BuildConnectMqttCommand(connectionOptions, "HUSL");
			if (!command.IsSuccess)
			{
				return command;
			}
			OperateResult send = await SendAsync(socket, command.Content);
			if (!send.IsSuccess)
			{
				return send;
			}
			OperateResult<byte, byte[]> receive = await ReceiveMqttMessageAsync(socket);
			if (!receive.IsSuccess)
			{
				return receive;
			}
			OperateResult check = MqttHelper.CheckConnectBack(receive.Content1, receive.Content2);
			if (!check.IsSuccess)
			{
				socket?.Close();
				return check;
			}
			incrementCount.ResetCurrentValue();
			return OperateResult.CreateSuccessResult();
		}

		/// <inheritdoc />
		public override OperateResult<byte[]> ReadFromCoreServer(Socket socket, byte[] send)
		{
			return ReadMqttFromCoreServer(socket, send, null, null, null);
		}

		private OperateResult<byte[]> ReadMqttFromCoreServer(Socket socket, byte[] send, Action<long, long> sendProgress, Action<string, string> handleProgress, Action<long, long> receiveProgress)
		{
			OperateResult operateResult = Send(socket, send);
			if (!operateResult.IsSuccess)
			{
				return OperateResult.CreateFailedResult<byte[]>(operateResult);
			}
			long num;
			long num2;
			do
			{
				OperateResult<byte, byte[]> operateResult2 = ReceiveMqttMessage(socket);
				if (!operateResult2.IsSuccess)
				{
					return OperateResult.CreateFailedResult<byte[]>(operateResult2);
				}
				OperateResult<string, byte[]> operateResult3 = MqttHelper.ExtraMqttReceiveData(operateResult2.Content1, operateResult2.Content2);
				if (!operateResult3.IsSuccess)
				{
					return OperateResult.CreateFailedResult<byte[]>(operateResult3);
				}
				if (operateResult3.Content2.Length != 16)
				{
					return new OperateResult<byte[]>(StringResources.Language.ReceiveDataLengthTooShort);
				}
				num = BitConverter.ToInt64(operateResult3.Content2, 0);
				num2 = BitConverter.ToInt64(operateResult3.Content2, 8);
				sendProgress?.Invoke(num, num2);
			}
			while (num != num2);
			OperateResult<byte, byte[]> operateResult4;
			while (true)
			{
				operateResult4 = ReceiveMqttMessage(socket, receiveProgress);
				if (!operateResult4.IsSuccess)
				{
					return OperateResult.CreateFailedResult<byte[]>(operateResult4);
				}
				if (operateResult4.Content1 >> 4 != 15)
				{
					break;
				}
				OperateResult<string, byte[]> operateResult5 = MqttHelper.ExtraMqttReceiveData(operateResult4.Content1, operateResult4.Content2);
				handleProgress?.Invoke(operateResult5.Content1, Encoding.UTF8.GetString(operateResult5.Content2));
			}
			return OperateResult.CreateSuccessResult(operateResult4.Content2);
		}

		private OperateResult<byte[]> ReadMqttFromCoreServer(byte[] send, Action<long, long> sendProgress, Action<string, string> handleProgress, Action<long, long> receiveProgress)
		{
			OperateResult<byte[]> operateResult = new OperateResult<byte[]>();
			OperateResult<Socket> operateResult2 = null;
			InteractiveLock.Enter();
			try
			{
				operateResult2 = GetAvailableSocket();
				if (!operateResult2.IsSuccess)
				{
					IsSocketError = true;
					base.AlienSession?.Offline();
					InteractiveLock.Leave();
					operateResult.CopyErrorFromOther(operateResult2);
					return operateResult;
				}
				OperateResult<byte[]> operateResult3 = ReadMqttFromCoreServer(operateResult2.Content, send, sendProgress, handleProgress, receiveProgress);
				if (operateResult3.IsSuccess)
				{
					IsSocketError = false;
					operateResult.IsSuccess = operateResult3.IsSuccess;
					operateResult.Content = operateResult3.Content;
					operateResult.Message = StringResources.Language.SuccessText;
				}
				else
				{
					IsSocketError = true;
					base.AlienSession?.Offline();
					operateResult.CopyErrorFromOther(operateResult3);
				}
				ExtraAfterReadFromCoreServer(operateResult3);
				InteractiveLock.Leave();
			}
			catch
			{
				InteractiveLock.Leave();
				throw;
			}
			if (!isPersistentConn)
			{
				operateResult2?.Content?.Close();
			}
			return operateResult;
		}

		/// <inheritdoc />
		public override async Task<OperateResult<byte[]>> ReadFromCoreServerAsync(Socket socket, byte[] send)
		{
			return await ReadMqttFromCoreServerAsync(socket, send, null, null, null);
		}

		private async Task<OperateResult<byte[]>> ReadMqttFromCoreServerAsync(Socket socket, byte[] send, Action<long, long> sendProgress, Action<string, string> handleProgress, Action<long, long> receiveProgress)
		{
			OperateResult sendResult = await SendAsync(socket, send);
			if (!sendResult.IsSuccess)
			{
				return OperateResult.CreateFailedResult<byte[]>(sendResult);
			}
			long already;
			long total;
			do
			{
				OperateResult<byte, byte[]> server_receive = await ReceiveMqttMessageAsync(socket);
				if (!server_receive.IsSuccess)
				{
					return OperateResult.CreateFailedResult<byte[]>(server_receive);
				}
				OperateResult<string, byte[]> server_back = MqttHelper.ExtraMqttReceiveData(server_receive.Content1, server_receive.Content2);
				if (!server_back.IsSuccess)
				{
					return OperateResult.CreateFailedResult<byte[]>(server_back);
				}
				if (server_back.Content2.Length != 16)
				{
					return new OperateResult<byte[]>(StringResources.Language.ReceiveDataLengthTooShort);
				}
				already = BitConverter.ToInt64(server_back.Content2, 0);
				total = BitConverter.ToInt64(server_back.Content2, 8);
				sendProgress?.Invoke(already, total);
			}
			while (already != total);
			OperateResult<byte, byte[]> receive;
			while (true)
			{
				receive = await ReceiveMqttMessageAsync(socket, receiveProgress);
				if (!receive.IsSuccess)
				{
					return OperateResult.CreateFailedResult<byte[]>(receive);
				}
				if (receive.Content1 >> 4 != 15)
				{
					break;
				}
				OperateResult<string, byte[]> extra = MqttHelper.ExtraMqttReceiveData(receive.Content1, receive.Content2);
				handleProgress?.Invoke(extra.Content1, Encoding.UTF8.GetString(extra.Content2));
			}
			return OperateResult.CreateSuccessResult(receive.Content2);
		}

		private async Task<OperateResult<byte[]>> ReadMqttFromCoreServerAsync(byte[] send, Action<long, long> sendProgress, Action<string, string> handleProgress, Action<long, long> receiveProgress)
		{
			OperateResult<byte[]> result = new OperateResult<byte[]>();
			InteractiveLock.Enter();
			OperateResult<Socket> resultSocket;
			try
			{
				resultSocket = await GetAvailableSocketAsync();
				if (!resultSocket.IsSuccess)
				{
					IsSocketError = true;
					base.AlienSession?.Offline();
					InteractiveLock.Leave();
					result.CopyErrorFromOther(resultSocket);
					return result;
				}
				OperateResult<byte[]> read = await ReadMqttFromCoreServerAsync(resultSocket.Content, send, sendProgress, handleProgress, receiveProgress);
				if (read.IsSuccess)
				{
					IsSocketError = false;
					result.IsSuccess = read.IsSuccess;
					result.Content = read.Content;
					result.Message = StringResources.Language.SuccessText;
				}
				else
				{
					IsSocketError = true;
					base.AlienSession?.Offline();
					result.CopyErrorFromOther(read);
				}
				ExtraAfterReadFromCoreServer(read);
				InteractiveLock.Leave();
			}
			catch
			{
				InteractiveLock.Leave();
				throw;
			}
			if (!isPersistentConn)
			{
				resultSocket?.Content?.Close();
			}
			return result;
		}

		/// <summary>
		/// 从MQTT服务器同步读取数据，将payload发送到服务器，然后从服务器返回相关的数据，支持数据发送进度报告，服务器执行进度报告，接收数据进度报告操作<br />
		/// Synchronously read data from the MQTT server, send the payload to the server, and then return relevant data from the server, 
		/// support data transmission progress report, the server executes the progress report, and receives the data progress report
		/// </summary>
		/// <remarks>
		/// 进度报告可以实现一个比较有意思的功能，可以用来数据的上传和下载，提供一个友好的进度条，因为网络的好坏通常是不确定的。
		/// </remarks>
		/// <param name="topic">主题信息</param>
		/// <param name="payload">负载数据</param>
		/// <param name="sendProgress">发送数据给服务器时的进度报告，第一个参数为已发送数据，第二个参数为总发送数据</param>
		/// <param name="handleProgress">服务器处理数据的进度报告，第一个参数Topic自定义，通常用来传送操作百分比，第二个参数自定义，通常用来表示服务器消息</param>
		/// <param name="receiveProgress">从服务器接收数据的进度报告，第一个参数为已接收数据，第二个参数为总接收数据</param>
		/// <returns>服务器返回的数据信息</returns>
		public OperateResult<string, byte[]> Read(string topic, byte[] payload, Action<long, long> sendProgress = null, Action<string, string> handleProgress = null, Action<long, long> receiveProgress = null)
		{
			OperateResult<byte[]> operateResult = MqttHelper.BuildPublishMqttCommand(topic, payload);
			if (!operateResult.IsSuccess)
			{
				return OperateResult.CreateFailedResult<string, byte[]>(operateResult);
			}
			OperateResult<byte[]> operateResult2 = ReadMqttFromCoreServer(operateResult.Content, sendProgress, handleProgress, receiveProgress);
			if (!operateResult2.IsSuccess)
			{
				return OperateResult.CreateFailedResult<string, byte[]>(operateResult2);
			}
			return MqttHelper.ExtraMqttReceiveData(3, operateResult2.Content);
		}

		/// <summary>
		/// 从MQTT服务器同步读取数据，将指定编码的字符串payload发送到服务器，然后从服务器返回相关的数据，并转换为指定编码的字符串，支持数据发送进度报告，服务器执行进度报告，接收数据进度报告操作<br />
		/// Synchronously read data from the MQTT server, send the specified encoded string payload to the server, 
		/// and then return the data from the server, and convert it to the specified encoded string,
		/// support data transmission progress report, the server executes the progress report, and receives the data progress report
		/// </summary>
		/// <param name="topic">主题信息</param>
		/// <param name="payload">负载数据</param>
		/// <param name="sendProgress">发送数据给服务器时的进度报告，第一个参数为已发送数据，第二个参数为总发送数据</param>
		/// <param name="handleProgress">服务器处理数据的进度报告，第一个参数Topic自定义，通常用来传送操作百分比，第二个参数自定义，通常用来表示服务器消息</param>
		/// <param name="receiveProgress">从服务器接收数据的进度报告，第一个参数为已接收数据，第二个参数为总接收数据</param>
		/// <returns>服务器返回的数据信息</returns>
		public OperateResult<string, string> ReadString(string topic, string payload, Action<long, long> sendProgress = null, Action<string, string> handleProgress = null, Action<long, long> receiveProgress = null)
		{
			OperateResult<string, byte[]> operateResult = Read(topic, string.IsNullOrEmpty(payload) ? null : stringEncoding.GetBytes(payload), sendProgress, handleProgress, receiveProgress);
			if (!operateResult.IsSuccess)
			{
				return OperateResult.CreateFailedResult<string, string>(operateResult);
			}
			return OperateResult.CreateSuccessResult(operateResult.Content1, stringEncoding.GetString(operateResult.Content2));
		}

		/// <inheritdoc cref="M:HslCommunication.MQTT.MqttSyncClient.Read(System.String,System.Byte[],System.Action{System.Int64,System.Int64},System.Action{System.String,System.String},System.Action{System.Int64,System.Int64})" />
		public async Task<OperateResult<string, byte[]>> ReadAsync(string topic, byte[] payload, Action<long, long> sendProgress = null, Action<string, string> handleProgress = null, Action<long, long> receiveProgress = null)
		{
			OperateResult<byte[]> command = MqttHelper.BuildPublishMqttCommand(topic, payload);
			if (!command.IsSuccess)
			{
				return OperateResult.CreateFailedResult<string, byte[]>(command);
			}
			OperateResult<byte[]> read = await ReadMqttFromCoreServerAsync(command.Content, sendProgress, handleProgress, receiveProgress);
			if (!read.IsSuccess)
			{
				return OperateResult.CreateFailedResult<string, byte[]>(read);
			}
			return MqttHelper.ExtraMqttReceiveData(3, read.Content);
		}

		/// <inheritdoc cref="M:HslCommunication.MQTT.MqttSyncClient.ReadString(System.String,System.String,System.Action{System.Int64,System.Int64},System.Action{System.String,System.String},System.Action{System.Int64,System.Int64})" />
		public async Task<OperateResult<string, string>> ReadStringAsync(string topic, string payload, Action<long, long> sendProgress = null, Action<string, string> handleProgress = null, Action<long, long> receiveProgress = null)
		{
			OperateResult<string, byte[]> read = await ReadAsync(topic, string.IsNullOrEmpty(payload) ? null : stringEncoding.GetBytes(payload), sendProgress, handleProgress, receiveProgress);
			if (!read.IsSuccess)
			{
				return OperateResult.CreateFailedResult<string, string>(read);
			}
			return OperateResult.CreateSuccessResult(read.Content1, stringEncoding.GetString(read.Content2));
		}

		/// <inheritdoc />
		public override string ToString()
		{
			return $"MqttSyncClient[{connectionOptions.IpAddress}:{connectionOptions.Port}]";
		}
	}
}
